---
title: Lakehouse with Pandas and PySpark | Dagster
description: How to use Lakehouse to construct a Dagster pipeline with multiple compute options.
---

# Lakehouse with Pandas and Pyspark <Experimental />

<CodeReferenceLink filePath="examples/multi_type_lakehouse" />

Lakehouse is an experimental API built on top of Dagster's core abstractions that makes it easy to
define computations in terms of the data assets that they produce. In the [previous example](/guides/dagster/lakehouse), we demonstrated how to use Lakehouse to transform data assets into a core Dagster pipeline. In this example, we'll be demonstrating how we can use Lakehouse to construct a Dagster pipeline with multiple compute options. Users unfamiliar with lakehouse should first view the first lakehouse guide [here](/guides/dagster/lakehouse).

Different computable assets will often ingest data in different ways. Let's say we have a table of temperature samples collected in 5 minute increments, and we want to compute the high temperature for each day represented in the table. In addition, we want to compute the difference between the high temperature of consecutive days. This pipeline will ingest a csv file as a pandas dataframe, and outputted the computed high temperatures again as a csv file. Then, we want to ingest the computed high temperatures as a Spark Dataframe, and then output the differences back to csv files. First, we would need to utilize a storage mechanism that is digestible by Spark. In this case, that means representing tables as _directories_ of csv files as opposed to a single csv file. From these directories of csv files, we must be able to compute both pandas dataframes and spark dataframes. How do we utilize both the pandas conversion and the spark conversion?

Luckily, Lakehouse provides a way to _compose_ different types of storage. To see how, let's first define our storage for converting between a folder of csv files and pandas dataframe.

In the below code examples, note that PandasDF refers to the pandas dataframe class, `import pandas.Dataframe as PandasDF`, and SparkDF refers to the spark dataframe class, `import spark.Dataframe as SparkDF`.

## Data Assets

We'll use Assets to define each of the tables.

```python file=../../multi_type_lakehouse/multi_type_lakehouse/assets.py
"""Asset definitions for the multi_type_lakehouse example."""
import pandas as pd
from lakehouse import Column, computed_table, source_table
from pandas import DataFrame as PandasDF
from pyarrow import date32, float64, string
from pyspark.sql import DataFrame as SparkDF
from pyspark.sql import Window
from pyspark.sql import functions as f

sfo_q2_weather_sample_table = source_table(
    path=("sfo_q2_weather_sample",),
    columns=[Column("tmpf", float64()), Column("valid_date", string())],
)


@computed_table(
    input_assets=[sfo_q2_weather_sample_table],
    columns=[Column("valid_date", date32()), Column("max_tmpf", float64())],
)
def daily_temperature_highs_table(sfo_q2_weather_sample: PandasDF) -> PandasDF:
    """Computes the temperature high for each day"""
    sfo_q2_weather_sample["valid_date"] = pd.to_datetime(sfo_q2_weather_sample["valid"])
    return sfo_q2_weather_sample.groupby("valid_date").max().rename(columns={"tmpf": "max_tmpf"})


@computed_table(
    input_assets=[daily_temperature_highs_table],
    columns=[Column("valid_date", date32()), Column("max_tmpf", float64())],
)
def daily_temperature_high_diffs_table(daily_temperature_highs: SparkDF) -> SparkDF:
    """Computes the difference between each day's high and the previous day's high"""
    window = Window.orderBy("valid_date")
    return daily_temperature_highs.select(
        "valid_date",
        (
            daily_temperature_highs["max_tmpf"]
            - f.lag(daily_temperature_highs["max_tmpf"]).over(window)
        ).alias("day_high_diff"),
    )
```

`sfo_q2_weather_sample_table` represents our base temperature table. Passing in `"filesystem"` for
the `storage_key` argument indicates that this asset is stored locally. The `path` argument gives
the path to the data asset itself.

`daily_temperature_highs_table` represents our computed high temperatures. We explicitly define the
dependency on the original table by passing `sfo_q2_weather_sample_table` as the value for the
`input_deps` argument.

We have an additional table `daily_temperature_high_diffs_table` that
represents the difference between the high temperature of consecutive days. We use the
`input_assets` parameter to make explicit the dependency on `daily_temperature_highs_table`.

## Storage

The ingestion of csv files is a bit different this time, because our specification requires
ingesting a folder of csv files as opposed to a single csv file.

```python file=../../multi_type_lakehouse/multi_type_lakehouse/lakehouse_def.py startafter=start_lakehouse_def_marker_0 endbefore=end_lakehouse_def_marker_0
class LocalFileSystem:
    def __init__(self, config):
        self._root = config["root"]

    def get_fs_path(self, path: Tuple[str, ...]) -> str:
        return os.path.join(self._root, *(path[:-1]), path[-1])


local_filesystem_config_schema = {"root": StringSource}


@resource(config_schema=local_filesystem_config_schema)
def pandas_df_local_filesystem_storage(init_context):
    local_fs = LocalFileSystem(init_context.resource_config)

    class Storage(AssetStorage):
        def save(self, obj: PandasDF, path: Tuple[str, ...], _resources) -> None:
            """This saves the dataframe as a CSV using the layout written and expected by Spark/Hadoop.

            E.g. if the given storage maps the asset's path to the filesystem path "/a/b/c", a directory
            will be created with two files inside it:

                /a/b/c/
                    part-00000.csv
             2       _SUCCESS
            """
            directory = local_fs.get_fs_path(path)
            os.makedirs(directory, exist_ok=True)
            open(os.path.join(directory, "_SUCCESS"), "wb").close()
            csv_path = os.path.join(directory, "part-00000.csv")
            obj.to_csv(csv_path)

        def load(self, _python_type, path: Tuple[str, ...], _resources):
            """This reads a dataframe from a CSV using the layout written and expected by Spark/Hadoop.

            E.g. if the given storage maps the asset's path to the filesystem path "/a/b/c", and that
            directory contains:

                /a/b/c/
                    part-00000.csv
                    part-00001.csv
                    _SUCCESS

            then the produced dataframe will contain the concatenated contents of the two CSV files.
            """
            fs_path = os.path.abspath(local_fs.get_fs_path(path))
            paths = glob.glob(os.path.join(fs_path, "*.csv"))
            return pd.concat(map(pd.read_csv, paths))

    return Storage()
```

The `load` method takes in all csv files within a given directory, rather than specifying
a csv file or set of csv files explicitly. Analogously, the `save` method writes csv files to a
directory in parts.

We'll do something similar for conversion between a csv and a spark datafarame:

```python file=../../multi_type_lakehouse/multi_type_lakehouse/lakehouse_def.py startafter=start_lakehouse_def_marker_1 endbefore=end_lakehouse_def_marker_1
@resource(config_schema=local_filesystem_config_schema)
def spark_df_local_filesystem_storage(init_context):
    local_fs = LocalFileSystem(init_context.resource_config)

    class Storage(AssetStorage):
        def save(self, obj: SparkDF, path: Tuple[str, ...], _resources):
            obj.write.format("csv").options(header="true").save(
                local_fs.get_fs_path(path), mode="overwrite"
            )

        def load(self, _python_type, path, resources):
            return (
                resources.pyspark.spark_session.read.format("csv")
                .options(header="true")
                .load(local_fs.get_fs_path(path))
            )

    return Storage()
```

To compose these two compute types, we utilize the `multi_type_asset_storage` function
provided by lakehouse:

```python
from lakehouse import multi_type_asset_storage
```

We can now define a composed AssetStorage:

```python file=../../multi_type_lakehouse/multi_type_lakehouse/lakehouse_def.py startafter=start_lakehouse_def_marker_2 endbefore=end_lakehouse_def_marker_2
local_file_system_storage = multi_type_asset_storage(
    local_filesystem_config_schema,
    {SparkDF: spark_df_local_filesystem_storage, PandasDF: pandas_df_local_filesystem_storage},
)
```

Finally, we construct our lakehouse:

```python file=../../multi_type_lakehouse/multi_type_lakehouse/lakehouse_def.py startafter=start_lakehouse_def_marker_3 endbefore=end_lakehouse_def_marker_3
def make_multi_type_lakehouse():
    dev_mode = ModeDefinition(
        resource_defs={
            "pyspark": pyspark_resource,
            "default_storage": local_file_system_storage.configured({"root": "."}),
        },
    )

    return Lakehouse(
        mode_defs=[dev_mode],
        in_memory_type_resource_keys={SparkDF: ["pyspark"]},
    )


multi_type_lakehouse = make_multi_type_lakehouse()
```

## Pipeline

Once again, using the data assets and storage for handling conversion, we have completely defined
our computation graph.
To construct a pipeline from these assets:

```python file=../../multi_type_lakehouse/multi_type_lakehouse/pipelines.py
"""Pipeline definitions for the multi_type_lakehouse example.
"""
from multi_type_lakehouse.assets import (
    daily_temperature_high_diffs_table,
    daily_temperature_highs_table,
)
from multi_type_lakehouse.lakehouse_def import multi_type_lakehouse

computed_assets = [daily_temperature_highs_table, daily_temperature_high_diffs_table]
multi_type_lakehouse_pipeline = multi_type_lakehouse.build_pipeline_definition(
    "multi_type_lakehouse_pipeline",
    computed_assets,
)
```

Note that the assets don't have to be provided in order. Lakehouse is able to determine asset
ordering by resolving input asset dependencies.
